package com.migrate.module.service;

import cn.hutool.core.collection.CollUtil;
import com.migrate.module.config.ApplicationContextUtil;
import com.migrate.module.domain.BinlogData;
import com.migrate.module.domain.EtlBinlogConsumeRecord;
import com.migrate.module.enumeration.BinlogType;
import com.migrate.module.enumeration.ConsumerStatus;
import com.migrate.module.mapper.migrate.EtlBinlogConsumeRecordMapper;
import com.migrate.module.migrate.LocalQueue;
import com.migrate.module.util.BinlogUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.Date;
import java.util.List;
import java.util.Objects;

/**
 * binlog消息拉取任务（只拉不提交）
 *
 * @author zhonghuashishan
 */
@Slf4j
public class CanalPullRunner implements Runnable
{
    /**
     * 消息主题
     */
    private final String topic;
    /**
     * rocketmq的nameServer地址
     */
    private final String nameServerUrl;
    /**
     * binlog消息同步消费记录表Mapper
     */
    private final EtlBinlogConsumeRecordMapper consumeRecordMapper;

    /**
     * 消息拉取任务构造方法
     * @param topic 消息主题
     * @param nameServerUrl rocketmq的nameServer地址
     */
    public CanalPullRunner(String topic, String nameServerUrl)
    {
        this.topic = topic;
        this.nameServerUrl = nameServerUrl;
        this.consumeRecordMapper = ApplicationContextUtil.getBean(EtlBinlogConsumeRecordMapper.class);
    }

    @Override
    public void run()
    {
        pullRun();
    }

    /**
     * 执行消息拉取
     */
    private void pullRun ()
    {
        try
        {
            DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("binlogPullConsumer");
            litePullConsumer.setAutoCommit(false);
            litePullConsumer.setNamesrvAddr(nameServerUrl);
            litePullConsumer.subscribe(topic, "*");
            litePullConsumer.start();
            try {
                while (true) {
                    // 拉取未消费消息
                    List<MessageExt> messageExts = litePullConsumer.poll();
                    if (CollUtil.isNotEmpty(messageExts))
                    {
                        for (MessageExt messageExt : messageExts)
                        {
                            byte[] body = messageExt.getBody();
                            String msg = new String(body);
                            // 记录queueId和offset
                            int queueId = messageExt.getQueueId();
                            long offset = messageExt.getQueueOffset();
                            // 判断该消息是否已经存在消费记录，如果存在则跳过执行
                            EtlBinlogConsumeRecord existsRecord = consumeRecordMapper.getExistsRecord(queueId, offset);
                            if (null == existsRecord)
                            {
                                // 新增消费记录
                                processNewMsg(messageExt, msg);
                            }
                            else
                            {
                                // 处理已经存在的消费记录
                                proccessExistsRecord(litePullConsumer, msg, existsRecord);

                            }
                        }
                    }
                    else
                    {
                        Thread.sleep(5000);
                    }
                }
            } finally {
                litePullConsumer.shutdown();
            }
        }
        catch (InterruptedException | MQClientException e)
        {
            try
            {
                // 假设要拉取消息的主题还不存在，则会抛出异常，这种情况下休眠五秒再重试
                Thread.sleep(5000);
                pullRun ();
            }
            catch (InterruptedException ignored)
            {

            }
        }
    }

    /**
     * 处理新的消息
     * @param messageExt mq消息对象
     * @param msg 消息内容
     */
    private void processNewMsg(MessageExt messageExt, String msg)
    {
        try
        {
            BinlogData binlogData = BinlogUtils.getBinlogDataMap(msg);
            if (Objects.isNull(binlogData)){
                return;
            }
            Boolean targetOperateType = BinlogType.INSERT.getValue().equals(binlogData.getOperateType())
                    || BinlogType.DELETE.getValue().equals(binlogData.getOperateType())
                    || BinlogType.UPDATE.getValue().equals(binlogData.getOperateType());
            if (!targetOperateType  || null == binlogData.getDataMap() ){
                return;
            }

            EtlBinlogConsumeRecord consumeRecord = new EtlBinlogConsumeRecord();
            consumeRecord.setQueueId(messageExt.getQueueId());
            consumeRecord.setOffset(messageExt.getQueueOffset());
            consumeRecord.setTopic(messageExt.getTopic());
            consumeRecord.setBrokerName(messageExt.getBrokerName());
            consumeRecord.setConsumeStatus(ConsumerStatus.NOT_CONSUME.getValue());
            consumeRecord.setCreateTime(new Date());
            consumeRecordMapper.insert(consumeRecord);
            LocalQueue.getInstance().submit(binlogData, consumeRecord);
        }
        catch (Exception e)
        {
            log.error("新增消费记录失败", e);
        }
    }

    /**
     * 处理已经存在的消费记录
     * @param litePullConsumer mq消费者
     * @param msg 消息内容
     * @param existsRecord 已经存在的消费记录
     */
    private void proccessExistsRecord(DefaultLitePullConsumer litePullConsumer, String msg, EtlBinlogConsumeRecord existsRecord)
    {
        // 已经存在的消费记录状态为已提交，说明mq里的对应消息修改提交状态失败了
        // （rocketmq源码里手动提交消息时，如果失败了只会记录日志不会抛出异常，因此这里必须再次尝试提交消息防止mq中未处理的消息和实际情况不符）
        try
        {
            if (ConsumerStatus.COMMITTED.getValue().equals(existsRecord.getConsumeStatus()))
            {
                litePullConsumer.seek(new MessageQueue(existsRecord.getTopic(), existsRecord.getBrokerName(), existsRecord.getQueueId()), existsRecord.getOffset());
                //这一步必须，不然手动提交的东西不对
                List<MessageExt> committedFaildmessageExts = litePullConsumer.poll();
                // 再次提交已消费的消息
                litePullConsumer.commitSync();
            }
            else
            {
                BinlogData binlogData = BinlogUtils.getBinlogDataMap(msg);
                if (null == binlogData){
                    return;
                }
                LocalQueue.getInstance().submit(binlogData, existsRecord);
            }
        }
        catch (Exception e)
        {
            log.error("消息重新消费失败", e);
        }
    }
}
